-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk processor concurrent requests #41451
Bulk processor concurrent requests #41451
Conversation
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that allows for simple semantics to deal with sending bulk requests. Once a bulk reaches it's pre-defined size, documents, or flush interval it will execute sending the bulk. One configurable option is the number of concurrent outstanding bulk requests. That concurrency is implemented in `org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However, the only code that currently calls into this code is blocked by `sychrnoized` methods. This results in the in-ability for the BulkProcessor to behave concurrently despite supporting configurable amounts of concurrent requests. This change removes the `synchronized` method in favor an explicit lock around the non-thread safe parts of the method. The call into `org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency. Note - only the primary add method has been updated since this is the hot path. Currently only Watcher uses this class and the concurrency is set to 0 by default. This change allows Watcher to optionally increase the concurrency.
Pinging @elastic/es-core-features |
Hold off reviewing...I think I need to wrap the other instance swapping inside the same lock. |
@elasticmachine run elasticsearch-ci/bwc |
@martijnvg @hub-cap - this is ready for review. I have re-run the tests a few hundred more times with the code as-is and can't get it to fail. To see this change in action you need to add the following println in
You will notice that before this change, it at most ever acquired 1 permit (due to the upstream synchronous method). With this change it can consume all the permits and add to the queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thank you for the detailed commit msg and note about the testing. I had one question but I will defer to those who are more well versed than I in the art of locks in java!
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments about lock usage and the testing
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments;) { | ||
//alternate between ways to add to the bulk processor | ||
futures.add(executorService.submit(() -> { | ||
if(i.get() % 2 == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this is the best way to test this. Ideally you want all of your threads ready to go prior to starting work. Usually we use a cyclic barrier or countdown latch so that we know all threads are ready prior to execution of the concurrent tasks. An example use of a cyclic barrier would be
elasticsearch/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java
Lines 843 to 878 in 0531987
public void testExceptionThrownDuringConcurrentComputeIfAbsent() throws BrokenBarrierException, InterruptedException { | |
int numberOfThreads = randomIntBetween(2, 32); | |
final Cache<String, String> cache = CacheBuilder.<String, String>builder().build(); | |
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); | |
final String key = randomAlphaOfLengthBetween(2, 32); | |
for (int i = 0; i < numberOfThreads; i++) { | |
Thread thread = new Thread(() -> { | |
try { | |
barrier.await(); | |
for (int j = 0; j < numberOfEntries; j++) { | |
try { | |
String value = cache.computeIfAbsent(key, k -> { | |
throw new RuntimeException("failed to load"); | |
}); | |
fail("expected exception but got: " + value); | |
} catch (ExecutionException e) { | |
assertNotNull(e.getCause()); | |
assertThat(e.getCause(), instanceOf(RuntimeException.class)); | |
assertEquals(e.getCause().getMessage(), "failed to load"); | |
} | |
} | |
barrier.await(); | |
} catch (BrokenBarrierException | InterruptedException e) { | |
throw new AssertionError(e); | |
} | |
}); | |
thread.start(); | |
} | |
// wait for all threads to be ready | |
barrier.await(); | |
// wait for all threads to finish | |
barrier.await(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some more comments and suggestions. Mainly concerning the handling of InterruptedException, keeping track of all exceptions, and tightening up the start gate latch so we know all items have been added and the executor has all of its threads constructed and has started running the expected number of concurrent tasks.
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that allows for simple semantics to deal with sending bulk requests. Once a bulk reaches it's pre-defined size, documents, or flush interval it will execute sending the bulk. One configurable option is the number of concurrent outstanding bulk requests. That concurrency is implemented in `org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However, the only code that currently calls into this code is blocked by `synchronized` methods. This results in the in-ability for the BulkProcessor to behave concurrently despite supporting configurable amounts of concurrent requests. This change removes the `synchronized` method in favor an explicit lock around the non-thread safe parts of the method. The call into `org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that allows for simple semantics to deal with sending bulk requests. Once a bulk reaches it's pre-defined size, documents, or flush interval it will execute sending the bulk. One configurable option is the number of concurrent outstanding bulk requests. That concurrency is implemented in `org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However, the only code that currently calls into this code is blocked by `synchronized` methods. This results in the in-ability for the BulkProcessor to behave concurrently despite supporting configurable amounts of concurrent requests. This change removes the `synchronized` method in favor an explicit lock around the non-thread safe parts of the method. The call into `org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that allows for simple semantics to deal with sending bulk requests. Once a bulk reaches it's pre-defined size, documents, or flush interval it will execute sending the bulk. One configurable option is the number of concurrent outstanding bulk requests. That concurrency is implemented in `org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However, the only code that currently calls into this code is blocked by `synchronized` methods. This results in the in-ability for the BulkProcessor to behave concurrently despite supporting configurable amounts of concurrent requests. This change removes the `synchronized` method in favor an explicit lock around the non-thread safe parts of the method. The call into `org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
7.3.0 fixes this issue by changing the locking strategy in elastic#41451. However, that change is not part of 6.x and the change here is a minimal workaround to prevent the potential of deadlock. This change will no longer retry failed bulk requests that go through the BulkProcessor for Watcher. Specifically this removes the retry logic when adding Watcher history and Triggered watches when the Bulk request failed. Related elastic#47599
7.3.0 fixes this issue by changing the locking strategy in #41451. However, that change is not part of 6.x and the change here is a minimal workaround to prevent the potential of deadlock. This change will no longer retry failed bulk requests that go through the BulkProcessor for Watcher. Specifically this removes the retry logic when adding Watcher history and Triggered watches when the Bulk request failed. Related #47599
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes elastic#47599 Note - elastic#41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes #47599 Note - #41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes elastic#47599 Note - elastic#41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes elastic#47599 Note - elastic#41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes elastic#47599 Note - elastic#41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes #47599 Note - #41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes #47599 Note - #41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
* Prevent deadlock by using separate schedulers (#48697) Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes #47599 Note - #41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried.
Is there any chance this fix will be backported to 6.8? Seems on par with the later #48697 which was backported. |
org.elasticsearch.action.bulk.BulkProcessor
is a threadsafe class thatallows for simple semantics to deal with sending bulk requests. Once a
bulk reaches it's pre-defined size, documents, or flush interval it will
execute sending the bulk. One configurable option is the number of concurrent
outstanding bulk requests. That concurrency is implemented in
org.elasticsearch.action.bulk.BulkRequestHandler
via a semaphore. However,the only code that currently calls into this code is blocked by
synchronized
methods. This results in the in-ability for the BulkProcessor to behave concurrently
despite supporting configurable amounts of concurrent requests.
This change removes the
synchronized
method in favor an explicitlock around the non-thread safe parts of the method. The call into
org.elasticsearch.action.bulk.BulkRequestHandler
is no longer blocking, whichallows
org.elasticsearch.action.bulk.BulkRequestHandler
to handle it's own concurrency.Currently the only production code that uses this class is Watcher
and it is defaulted to 0 concurrency. This change allows Watcher to optionally
increase the concurrency.
I have run this test a few thousand times and can not get it to error. Also, if I remove the lock locally the test will fail pretty quickly.
This is tangentially related to #41418 since it is the same synchronized method that can cause issues. However, this change without that fix only moves where the blocking happens (from synchronized block to semaphore block).
Adding HLRC and Watcher since outside of tests, they are the largest consumers.